/**
 * 
 */
package io.potato.task;

import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import io.potato.core.util.TableUtil;
import io.potato.ts.common.Constants;
import io.potato.ts.domain.SmsSendingRecord;
import io.potato.ts.service.SmsSendingRecordService;
import io.potato.ts.service.SmsService;
import io.potato.ts.service.UserService;
import lombok.extern.slf4j.Slf4j;

/**
 * 发送短信调度
 * @author timl
 * created: 2019年1月9日 下午6:00:53
 */

@Component
@Slf4j
public class SendSmsTask {
	
	@Autowired
	SmsService smsService;
	
	@Autowired
	UserService userService;
	
	@Autowired
	SmsSendingRecordService ssr;
	
	/**
	 * 发送短信的线程池
	 */
	ExecutorService sendSmsExecutor = Executors.newFixedThreadPool(40);
	
	public void runAPITask(int start, int end) {
		for (int i = start; i <= end; i++) {
			final int seq = i;
			sendSmsExecutor.execute(() -> {
				log.warn("init api send task " + seq);
				sendSmsAtTime(seq, Constants.SENDER_TYPE_API);
			}); 
		}
	}
	
	public void runWebTask() {
		for (int i = 0; i < Constants.SEND_WEB_SMS_NUM; i++) {
			final int seq = i;
			sendSmsExecutor.execute(() -> {
				log.warn("init web send task " + seq);
				sendSmsAtTime(seq, Constants.SENDER_TYPE_WEB);
			}); 
		}
	}
	
	public void runReSendTask() {
		
		// web sms 重发
		sendSmsExecutor.execute(() -> {
			log.warn("init web resend task");
			reSendFailed(Constants.SENDER_TYPE_WEB, new int[] {1, 2, 3, 4, 5, 6}, 0, 0, 4000);
		}); 
		
		// api sms 重发
		sendSmsExecutor.execute(() -> {
			log.warn("init resend task 1 - 0 ");
			reSendFailed(Constants.SENDER_TYPE_API, new int[] {1}, 0, 9, 4000);
		}); 
		
		sendSmsExecutor.execute(() -> {
			log.warn("init resend task 1 - 1 ");
			reSendFailed(Constants.SENDER_TYPE_API, new int[] {1}, 10, 19, 4000);
		}); 
		
		sendSmsExecutor.execute(() -> {
			log.warn("init resend task 2");
			reSendFailed(Constants.SENDER_TYPE_API, new int[] {2}, 0, 19, 6 * 1000);
		}); 
		
		sendSmsExecutor.execute(() -> {
			log.warn("init resend task 3, 4 ");
			reSendFailed(Constants.SENDER_TYPE_API, new int[] {3, 4}, 0, 19, 20 * 1000);
		}); 
		
		sendSmsExecutor.execute(() -> {
			log.warn("init resend task 5 ~ 10 ");
			reSendFailed(Constants.SENDER_TYPE_API, new int[] {5, 6, 7, 8}, 0, 19, 100 * 1000);
		}); 
	}
	
	/**
	 * 查询到期的定时短信并发送
	 */
	private void sendSmsAtTime(int seq, int senderType) {
		sleep(5000 + 100 * seq);
		final String table = TableUtil.getSendingRecordTable(senderType, seq);
		log.warn("read data from table " + table);
		while (true) {
			try {
				LocalDateTime now = LocalDateTime.now();
				// 查询待发送记录
				List<SmsSendingRecord> toSendList = 
						Constants.SENDER_TYPE_API.equals(senderType) 
						? ssr.findRecordToSend(now, table) 
					    : ssr.findRecordToSend(now, seq, table);
				
				if (toSendList == null || toSendList.isEmpty()) {
					sleep(480);
					continue;
				}
				long start = System.currentTimeMillis();
				log.debug("start to send " + table + " for " + toSendList.size() + " sms");
				// 发送短信
				smsService.sendSms(toSendList, senderType);
				long dur = System.currentTimeMillis() - start;
				log.debug("finish to send " + table + " for " + toSendList.size() + " sms IN " + dur + " MS");
			} catch (Throwable e) {
				log.error("", e);
				sleep(800);
			}
		}
	}
	
	private void reSendFailed(int senderType, int[] tryTimeArr, int startSeq, int endSeq, long sleep) {
		sleep(3000);
		//String tryTimeStr = Strings.join(tryTimeArr, ',');
		while (true) {
			try {
				//log.debug("start to check resend sms for " + tryTimeStr + ", " + startSeq + " ~ " + endSeq  );
				int totalSendCount = 0;
				for (int seq = startSeq; seq <= endSeq; seq++) {
					String table = TableUtil.getSendingRecordTable(senderType, seq);
					for (int tryTimes : tryTimeArr) {
						int dur = this.getRetryDuration(tryTimes);
						LocalDateTime now = LocalDateTime.now();
						LocalDateTime endTime = now.minusSeconds(dur);
						// 查询待发送记录
						
						List<SmsSendingRecord> toSendList =this.ssr.findFailedToSend(now, tryTimes, endTime, table);
						if (toSendList != null && toSendList.size() > 0) {
							log.info(senderType + " Resend for trytimes: " + tryTimes + " dur: " + dur + "seq: " + seq + " , total sms is " + toSendList.size());
							totalSendCount += toSendList.size();
							smsService.sendSms(toSendList, senderType);
							log.info(senderType + " Finish for trytimes: " + tryTimes + " dur: " + dur + "seq: " + seq + " , total sms is " + toSendList.size());
						}
					}
				}
				
				if (totalSendCount == 0) {
					sleep(sleep);
				}
			} catch (Throwable e) {
				log.error("", e);
				sleep(800);
			}
		}
	}
	
	private int getRetryDuration(int tryTimes) {
		if (tryTimes == 1) {
			return 30;  // 30秒
		} else if (tryTimes == 2) {
			return 60;  // 60秒
		} else if (tryTimes == 3) {
			return 300;  // 5分钟
		} else if (tryTimes == 4) {
			return 600;  // 10分钟
		} else if ( tryTimes == 5) {
			return 1800;  // 半个小时
		} else  {
			// 第六次开始每个小时重试一次
			return (tryTimes - 5) * 3600;
		} 
	}
	
	private void sleep(long ms) {
		try {
			Thread.sleep(ms);
		} catch (InterruptedException e) {
			// should not go here
		}
	}
	
}
